{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 202 - Training and Evaluating CNTK Models in Spark ML Pipelines\n",
    "\n",
    "Yet again, now using the `Word2Vec` Estimator from Spark.  We can use the tree-based\n",
    "learners from spark in this scenario due to the lower dimensionality representation of\n",
    "features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import mmlspark\n",
    "from pyspark.sql.types import IntegerType, StringType, StructType, StructField"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dataFile = \"BookReviewsFromAmazon10K.tsv\"\n",
    "textSchema = StructType([StructField(\"rating\", IntegerType(), False),\n",
    "                         StructField(\"text\", StringType(), False)])\n",
    "import os, urllib\n",
    "if not os.path.isfile(dataFile):\n",
    "    urllib.request.urlretrieve(\"https://mmlspark.azureedge.net/datasets/\"+dataFile, dataFile)\n",
    "data = spark.createDataFrame(pd.read_csv(dataFile, sep=\"\\t\", header=None), textSchema)\n",
    "data.limit(10).toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Modify the label column to predict a rating greater than 3."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "processedData = data.withColumn(\"label\", data[\"rating\"] > 3) \\\n",
    "                    .select([\"text\", \"label\"])\n",
    "processedData.limit(5).toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Split the dataset into train, test and validation sets."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use `Tokenizer` and `Word2Vec` to generate the features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.feature import Tokenizer, Word2Vec\n",
    "tokenizer = Tokenizer(inputCol=\"text\", outputCol=\"words\")\n",
    "partitions = train.rdd.getNumPartitions()\n",
    "word2vec = Word2Vec(maxIter=4, seed=42, inputCol=\"words\", outputCol=\"features\",\n",
    "                    numPartitions=partitions)\n",
    "textFeaturizer = Pipeline(stages = [tokenizer, word2vec]).fit(train)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Transform each of the train, test and validation datasets."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ptrain = textFeaturizer.transform(train).select([\"label\", \"features\"])\n",
    "ptest = textFeaturizer.transform(test).select([\"label\", \"features\"])\n",
    "pvalidation = textFeaturizer.transform(validation).select([\"label\", \"features\"])\n",
    "ptrain.limit(5).toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Generate several models with different parameters from the training data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier\n",
    "from mmlspark.TrainClassifier import TrainClassifier\n",
    "import itertools\n",
    "\n",
    "lrHyperParams       = [0.05, 0.2]\n",
    "logisticRegressions = [LogisticRegression(regParam = hyperParam)\n",
    "                       for hyperParam in lrHyperParams]\n",
    "lrmodels            = [TrainClassifier(model=lrm, labelCol=\"label\").fit(ptrain)\n",
    "                       for lrm in logisticRegressions]\n",
    "\n",
    "rfHyperParams       = itertools.product([5, 10], [2, 3])\n",
    "randomForests       = [RandomForestClassifier(numTrees=hyperParam[0], maxDepth=hyperParam[1])\n",
    "                       for hyperParam in rfHyperParams]\n",
    "rfmodels            = [TrainClassifier(model=rfm, labelCol=\"label\").fit(ptrain)\n",
    "                       for rfm in randomForests]\n",
    "\n",
    "gbtHyperParams      = itertools.product([8, 16], [2, 3])\n",
    "gbtclassifiers      = [GBTClassifier(maxBins=hyperParam[0], maxDepth=hyperParam[1])\n",
    "                       for hyperParam in gbtHyperParams]\n",
    "gbtmodels           = [TrainClassifier(model=gbt, labelCol=\"label\").fit(ptrain)\n",
    "                       for gbt in gbtclassifiers]\n",
    "\n",
    "trainedModels       = lrmodels + rfmodels + gbtmodels"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Find the best model for the given test dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from mmlspark import FindBestModel\n",
    "bestModel = FindBestModel(evaluationMetric=\"AUC\", models=trainedModels).fit(ptest)\n",
    "bestModel.getEvaluationResults().show()\n",
    "bestModel.getBestModelMetrics().show()\n",
    "bestModel.getAllModelMetrics().show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Get the accuracy from the validation dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from mmlspark.ComputeModelStatistics import ComputeModelStatistics\n",
    "predictions = bestModel.transform(pvalidation)\n",
    "metrics = ComputeModelStatistics().transform(predictions)\n",
    "print(\"Best model's accuracy on validation set = \"\n",
    "      + \"{0:.2f}%\".format(metrics.first()[\"accuracy\"] * 100))\n",
    "print(\"Best model's AUC on validation set = \"\n",
    "      + \"{0:.2f}%\".format(metrics.first()[\"AUC\"] * 100))"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [default]",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
